1package com.lectra.kafka.stream.example
2
3import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
4import org.apache.kafka.streams._
5import org.scalatest.flatspec.AnyFlatSpec
6import org.scalatest.matchers.should.Matchers
7import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, GivenWhenThen}
8
9import java.io.File
10import java.util.UUID
11
12class KafkaStreamSelectKeyTest extends AnyFlatSpec with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with GivenWhenThen {
13
14 private val stringSerializer = new StringSerializer()
15 private val stringDeserializer = new StringDeserializer()
16
17 private var driver: TopologyTestDriver = _
18 private var inputTopic: TestInputTopic[String, String] = _
19 private var outputTopic: TestOutputTopic[String, String] = _
20
21 private def tempDir: File = {
22 val ioDir = System.getProperty("java.io.tmpdir")
23 val f = new File(ioDir, "kafka-" + UUID.randomUUID().toString)
24 f.mkdirs()
25 f.deleteOnExit()
26 f
27 }
28
29 private def buildTopology(): Topology = {
30 import org.apache.kafka.streams.scala.StreamsBuilder
31 val builder = new StreamsBuilder
32 KafkaStreamSelectKey.topology(builder)
33 builder.build()
34 }
35
36 override def beforeEach(): Unit = {
37 KafkaStreamAvro.config.put(StreamsConfig.STATE_DIR_CONFIG, tempDir.getAbsolutePath)
38 driver = new TopologyTestDriver(buildTopology(), KafkaStreamSelectKey.config)
39 inputTopic = driver.createInputTopic(KafkaStreamSelectKey.topicIn, stringSerializer, stringSerializer)
40 outputTopic = driver.createOutputTopic(KafkaStreamSelectKey.topicChangedKey, stringDeserializer, stringDeserializer)
41 }
42
43 override def afterEach(): Unit = {
44 driver.close()
45 }
46
47
48 "Nominal case for select" should "change the key of records by combining key and value with -" in {
49 val key = "mykey"
50 val value = "myvalue"
51 val key2 = "yourkey"
52 val value2 = "yourvalue"
53
54 inputTopic.pipeInput(key, value)
55 inputTopic.pipeInput(key2, value2)
56 val expectedKey1 = s"$key-$value"
57 val expectedKey2 = s"$key2-$value2"
58
59 outputTopic.getQueueSize shouldBe 2
60 outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey1, value)
61 outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey2, value2)
62
63 }
64
65
66}